package org.example.mqtt.demo1.config;

import org.example.mqtt.demo1.message.MqttReceiver;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.stereotype.Component;

import java.util.UUID;

/**
 * Created by way on 2017/12/22.
 */
@Configuration
@Component
public class MqttConfig {


    //    @Value("${mqtt.client.id}")
    private String mqttClientId = UUID.randomUUID().toString();

    @Value("${mqtt.server.flag}")
    private boolean mqttServerFlag;


    public static String mqttClientTopicRoot = "client";

    public static String mqttServerTopicRoot = "server";

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setServerURIs("tcp://localhost:1883");
//        factory.setUserName("guest");
//        factory.setPassword("guest");
        return factory;
    }

    //publisher ---------------------

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                new MqttPahoMessageHandler("publish_" + mqttClientId, mqttClientFactory());
        messageHandler.setAsync(true);
        if (mqttServerFlag) {
            //服务器默认广播给所有client
            messageHandler.setDefaultTopic(mqttClientTopicRoot);
        } else {
            //客户端发送给服务器,且topic带上自己的标识
            messageHandler.setDefaultTopic(mqttServerTopicRoot + "/" + mqttClientId);
        }
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    //receiver ---------------------------
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter;
        //客戶段只訂閲自己clientId的消息
        if (mqttServerFlag) {
            //服务器只接受server的消息
            adapter =
                    new MqttPahoMessageDrivenChannelAdapter("receiver_" + mqttClientId, mqttClientFactory(),
                            mqttServerTopicRoot + "/#");
        } else {
            adapter =
                    new MqttPahoMessageDrivenChannelAdapter("receiver_" + mqttClientId, mqttClientFactory(),
                            mqttClientTopicRoot, mqttClientTopicRoot + "/" + mqttClientId);
        }
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MqttReceiver();
    }

    public boolean isMqttServerFlag() {
        return mqttServerFlag;
    }

    public String getClientId() {
        return mqttClientId;
    }
}
